package com.dce.blockchain.web.service;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.java_websocket.WebSocket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import com.alibaba.fastjson.JSON;
import com.dce.blockchain.web.model.Block;
import com.dce.blockchain.web.model.Message;
import com.dce.blockchain.web.util.BlockCache;
import com.dce.blockchain.web.util.BlockConstant;
import com.dce.blockchain.websocket.P2PClient;
import com.dce.blockchain.websocket.P2PServer;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * p2p网络服务类
 *
 *
 */
@Service
public class P2PService implements ApplicationRunner {

    private ScheduledExecutorService scheduledExecutor;
    
    @Autowired
    BlockService blockService;
    
    @Autowired
    BlockCache blockCache;
    
    @Autowired
    P2PServer p2PServer;
    
    @Autowired
    P2PClient p2PClient;

    /**
     * 客户端和服务端共用的消息处理方法
     * @param webSocket
     * @param msg
     * @param sockets
     */
    public void handleMessage(WebSocket webSocket, String msg, List<WebSocket> sockets) {
        try {
            Message message = JSON.parseObject(msg, Message.class);
            System.out.println("IP address:" +webSocket.getRemoteSocketAddress().getAddress().toString()
                    +",portnumber:"+ webSocket.getRemoteSocketAddress().getPort() + "p2p message:"
                    + JSON.toJSONString(message));
            switch (message.getType()) {
            //客户端请求查询最新的区块:1
            case BlockConstant.QUERY_LATEST_BLOCK:
                write(webSocket, responseLatestBlockMsg());//服务端调用方法返回最新区块:2
                break;
            //接收到服务端返回的最新区块:2
            case BlockConstant.RESPONSE_LATEST_BLOCK:
                handleBlockResponse(message.getData(), sockets);
                break;
            //客户端请求查询整个区块链:3
            case BlockConstant.QUERY_BLOCKCHAIN:
                write(webSocket, responseBlockChainMsg());//服务端调用方法返回最新区块:4
                break;
            //直接接收到其他节点发送的整条区块链信息:4
            case BlockConstant.RESPONSE_BLOCKCHAIN:
                handleBlockChainResponse(message.getData(), sockets);
                break;
            }
        } catch (Exception e) {
            System.out.println("IP address:" +webSocket.getRemoteSocketAddress().getAddress().toString()
                +"portnumber:"+ webSocket.getRemoteSocketAddress().getPort() + "wrong p2p message:" 
                + e.getMessage());
        }
    }

    /**
     * 处理其它节点发送过来的区块信息
     * @param blockData
     * @param sockets
     */
    public synchronized void handleBlockResponse(String blockData, List<WebSocket> sockets) {
        //反序列化得到其它节点的最新区块信息
        Block latestBlockReceived = JSON.parseObject(blockData, Block.class);
        //当前节点的最新区块
        Block latestBlock = blockCache.getLatestBlock();
        
        if (latestBlockReceived != null) {
            if(latestBlock != null) {
                //如果接收到的区块高度比本地区块高度大的多
                if(latestBlockReceived.getIndex() > latestBlock.getIndex() + 1) {
                    broatcast(queryBlockChainMsg());
                    System.out.println("search all blockchain of node");
                }else if (latestBlockReceived.getIndex() > latestBlock.getIndex() && 
                        latestBlock.getHash().equals(latestBlockReceived.getPreviousHash())) {
                    if (blockService.addBlock(latestBlockReceived)) {
                        broatcast(responseLatestBlockMsg());
                    }
                    System.out.println("将新接收到的区块加入到本地的区块链");
                }
            }else if(latestBlock == null) {
                broatcast(queryBlockChainMsg());
                System.out.println("重新查询所有节点上的整条区块链");
            }
        }
    }
    
    /**
     * 处理其它节点发送过来的区块链信息
     * @param blockData
     * @param sockets
     */
    public synchronized void handleBlockChainResponse(String blockData, List<WebSocket> sockets) {
        //反序列化得到其它节点的整条区块链信息
        List<Block> receiveBlockchain = JSON.parseArray(blockData, Block.class);
        if(!CollectionUtils.isEmpty(receiveBlockchain) && blockService.isValidChain(receiveBlockchain)) {
            //根据区块索引先对区块进行排序
            Collections.sort(receiveBlockchain, new Comparator<Block>() {
                public int compare(Block block1, Block block2) {
                    return block1.getIndex() - block2.getIndex();
                }
            });
            
            //其它节点的最新区块
            Block latestBlockReceived = receiveBlockchain.get(receiveBlockchain.size() - 1);
            //当前节点的最新区块
            Block latestBlock = blockCache.getLatestBlock();
            
            if(latestBlock == null) {
                //替换本地的区块链
                blockService.replaceChain(receiveBlockchain);
            }else {
                //其它节点区块链如果比当前节点的长，则处理当前节点的区块链
                if (latestBlockReceived.getIndex() > latestBlock.getIndex()) {
                    if (latestBlock.getHash().equals(latestBlockReceived.getPreviousHash())) {
                        if (blockService.addBlock(latestBlockReceived)) {
                            broatcast(responseLatestBlockMsg());
                        }
                        System.out.println("将新接收到的区块加入到本地的区块链");
                    } else {
                        // 用长链替换本地的短链
                        blockService.replaceChain(receiveBlockchain);
                    }
                }
            }
        }
    }
    
    /**
     * 全网广播消息
     * @param message
     */
    public void broatcast(String message) {
        List<WebSocket> socketsList = this.getSockets();
        if (CollectionUtils.isEmpty(socketsList)) {
            return;
        }
        System.out.println("======全网广播消息开始：");
        for (WebSocket socket : socketsList) {
            this.write(socket, message);
        }
        System.out.println("======全网广播消息结束");
    }
    
    /**
     * 向其它节点发送消息
     * @param ws
     * @param message
     */
    public void write(WebSocket ws, String message) {
        System.out.println("send to:" +ws.getRemoteSocketAddress().getAddress().toString() 
            + "portnumber:"+ws.getRemoteSocketAddress().getPort() + "p2p meaasge:" + message);
        ws.send(message);
    }

    /**
     * 查询整条区块链
     * @return
     */
    public String queryBlockChainMsg() {
        return JSON.toJSONString(new Message(BlockConstant.QUERY_BLOCKCHAIN));
    }
    
    /**
     * 返回整条区块链数据
     * @return
     */
    public String responseBlockChainMsg() {
        Message msg = new Message();
        msg.setType(BlockConstant.RESPONSE_BLOCKCHAIN);
        msg.setData(JSON.toJSONString(blockCache.getBlockChain()));
        return JSON.toJSONString(msg);
    }

    /**
     * 查询最新的区块
     * @return
     */
    public String queryLatestBlockMsg() {
        return JSON.toJSONString(new Message(BlockConstant.QUERY_LATEST_BLOCK));
    }
    
    /**
     * 返回最新的区块
     * @return
     */
    public String responseLatestBlockMsg() {
        Message msg = new Message();
        msg.setType(BlockConstant.RESPONSE_LATEST_BLOCK);
        Block b = blockCache.getLatestBlock();
        msg.setData(JSON.toJSONString(b));
        return JSON.toJSONString(msg);
    }
    
    public List<WebSocket> getSockets(){
        return blockCache.getSocketsList();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        p2PServer.initP2PServer(blockCache.getP2pport());
        p2PClient.connectToPeer(blockCache.getAddress());

        // 启动定时任务，每隔一定时间发送心跳消息
        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        int heartbeatInterval = 5000; // 心跳间隔时间，单位为毫秒
        scheduledExecutor.scheduleAtFixedRate(this::sendHeartbeat, 0, heartbeatInterval, TimeUnit.MILLISECONDS);

        System.out.println("*****difficulty******"+blockCache.getDifficulty());
        System.out.println("*****portnumber******"+blockCache.getP2pport());
        System.out.println("*****node address******"+blockCache.getAddress());
        
    }

    /**
     * 发送心跳消息给其他节点
     */
    private void sendHeartbeat() {
        List<WebSocket> socketsList = this.getSockets();
        if (CollectionUtils.isEmpty(socketsList)) {
            return;
        }

        String heartbeatMessage = "Heartbeat message"; // 心跳消息内容
        for (WebSocket socket : socketsList) {
            //this.write(socket, heartbeatMessage);
            p2PServer.initP2PServer(blockCache.getP2pport());
            p2PClient.connectToPeer(blockCache.getAddress());
        }
    }
}